home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Language/OS - Multiplatform Resource Library
/
LANGUAGE OS.iso
/
et
/
et3_0-a1.lha
/
et3
/
src
/
StreamConnection.C
< prev
next >
Wrap
C/C++ Source or Header
|
1992-08-26
|
9KB
|
456 lines
#ifdef __GNUG__
#pragma implementation
#endif
#include "StreamConnection.h"
#include "Class.h"
#include "Error.h"
#include "String.h"
#include "MemBuf.h"
#include "OrdColl.h"
#include "CLib.h"
#include "Math.h"
#include "Env.h"
const int cHeaderSize= 20;
bool gIACDebug;
extern "C" void system_nonblock(int fd);
//---- Message -----------------------------------------------------------------
block::block()
{
len= cHeaderSize;
pos= 0;
buf= 0;
}
block::~block()
{
SafeDelete(buf);
}
Message::Message()
{
to= from= 0;
serial= 0;
}
Message::Message(int t, int f, int reply, char *buf, int l, bool makecopy)
{
to= t;
from= f;
serial= reply;
if (l < 0)
l= strlen(buf)+1;
data[1].len= l;
if (makecopy) {
data[1].buf= new char[l];
strncpy(data[1].buf, buf, l);
} else
data[1].buf= buf;
}
int Message::Read(int fd)
{
for (int i= 0; i < 2; i++) {
block *b= &data[i];
if (b->pos < b->len) {
if (b->buf == 0)
b->buf= new char[b->len];
int cnt= CLib::Read(fd, b->buf+b->pos, b->len-b->pos);
if (cnt == 0 || cnt == -1)
return -1;
if (cnt == -2)
return -2;
b->pos+= cnt;
if (b->pos == b->len) {
if (i == 1) {
return 0;
}
if (sscanf(b->buf, "%d %d %d %d",
&to, &from, &data[1].len, &serial) != 4)
return -1;
}
}
}
return 0;
}
int Message::Write(int fd)
{
for (int i= 0; i < 2; i++) {
block *b= &data[i];
while (b->pos < b->len) {
int wl= CLib::Write(fd, b->buf+b->pos, b->len-b->pos);
if (wl == -1) // fatal error
return -1;
if (wl == -2) // try again
return -2;
b->pos+= wl;
}
}
Print("Write", 0);
return 0; // completely written
}
void Message::Print(char *msg, char*)
{
if (gIACDebug) {
int l= data[1].len;
fprintf(stderr, "%s fr:%d to:%d ser:%d l:%d <", msg, from, to, serial, l);
if (l < 60) {
fprintf(stderr, "%s>\n", data[1].buf);
} else {
fwrite(data[1].buf, 1, 60, stderr);
fprintf(stderr, "...>\n");
}
}
}
//---- StreamConnection --------------------------------------------------------
NewMetaImpl0(StreamConnection, SysEvtHandler);
StreamConnection::StreamConnection(int f) : SysEvtHandler(f)
{
gIACDebug= Env::GetValue("IAC.Debug", FALSE);
serial= 100;
wqueue= 0;
queue= 0;
rm= 0;
sid= 0;
name= 0;
//SetName(gProgname);
error= !Open(f);
gSystem->AddWorkHandler(this);
}
StreamConnection::~StreamConnection()
{
error= TRUE;
CLib::Close(GetResourceId());
SafeDelete(name);
}
void StreamConnection::SetName(char *n)
{
SafeDelete(name);
name= strsave(n);
}
bool StreamConnection::Open(int fd)
{
if (fd > 0) {
SetResourceId(fd);
system_nonblock(fd);
SetResourceId(fd);
gSystem->AddFileInputHandler(this);
gSystem->AddFileOutputHandler(this);
error= FALSE;
return TRUE;
}
error= TRUE;
return FALSE;
}
int StreamConnection::Error(int code)
{
error= TRUE;
fprintf(stderr, "StreamConnection::Error: disconnecting from server; reason: %d\n", code);
return 0;
}
bool StreamConnection::Reconnect()
{
/*
if (error) {
int sock= gSystem->OpenConnection(SERVERNAME, SERVICENAME);
if (sock >= 0)
error= !Open(sock);
}
*/
return error;
}
void StreamConnection::SendMessage(Message *m)
{
if (m) {
SafeDelete(m->data[0].buf);
m->data[0].len= cHeaderSize;
m->data[0].pos= 0;
m->data[0].buf= new char[cHeaderSize];
m->data[1].pos= 0;
sprintf(m->data[0].buf, "%d %d %d %d", m->to, m->from, m->data[1].len, m->serial);
if (wqueue == 0)
wqueue= new OrdCollection;
wqueue->Add(m);
}
}
int StreamConnection::DrainOutput(int timeout)
{
if (Reconnect())
return -1;
// wait for output to drain
while (wqueue) {
if (gSystem->CanWrite(GetResourceId(), timeout)) {
fprintf(stderr, "DrainOutput: timeout\n");
return -1;
}
Notify(eSysEvtWrite, GetResourceId());
}
return 0;
}
int StreamConnection::WaitForMessage(int timeout)
{
if (Reconnect())
return -1;
if (gSystem->CanRead(GetResourceId(), timeout))
return -1;
Notify(eSysEvtRead, GetResourceId());
return 0;
}
Message *StreamConnection::WaitForReply(int seq, int timeout)
{
if (Reconnect())
return 0;
for (;;) {
if (queue && queue->Size() > 0) {
register Message *m;
Iter next(queue);
while (m= (Message*) next()) {
if (m->IsReply()) { // a reply
int s= m->serial;
queue->RemovePtr(m);
if (s == -seq) {
return m;
} else {
delete m;
fprintf(stderr, "lost sync\n");
return 0;
}
}
}
}
DrainOutput(4000);
if (WaitForMessage(timeout) == -1) {
fprintf(stderr, "CanRead: timeout\n");
return 0;
}
}
return 0;
}
bool StreamConnection::HasInterest(SysEventCodes code)
{
if (error)
return FALSE;
if (SysEvtHandler::HasInterest(code) == FALSE)
return FALSE;
if (code == eSysEvtWrite)
return wqueue != 0;
return TRUE;
}
void StreamConnection::Notify(SysEventCodes sec, int)
{
if (error)
return;
switch (sec) {
case eSysEvtWork:
if (queue && queue->Size() > 0) {
Message *m= (Message*) queue->RemoveFirst();
if (m)
Dispatch0(m);
}
break;
case eSysEvtRead:
if (rm == 0)
rm= new Message;
switch (rm->Read(GetResourceId())) {
case 0: // got one
if (queue == 0)
queue= new OrdCollection;
if (rm->from == 0)
rm->from= sid;
rm->Print("Received", 0);
queue->Add(rm);
rm= 0;
break;
case -1: // fatal error
Error(2);
break;
case -2: // try again later
break;
}
break;
case eSysEvtWrite:
if (wqueue && wqueue->Size() > 0) {
Message *m= (Message*) wqueue->First();
if (m) {
switch (m->Write(GetResourceId())) {
case 0: // completely written
if ((Message*)wqueue->RemovePtr(m) == m) {
SafeDelete(m);
if (wqueue->Size() <= 0)
SafeDelete(wqueue);
} else // inconsistent queue
Error(5);
break;
case -1: // fatal error
Error(4);
break;
case -2: // try again
break;
}
}
}
break;
}
}
void StreamConnection::Dispatch0(Message *m)
{
char req[200], *bp= req, *buf= m->data[1].buf, *cp, *retbuf= 0;
int retlen= -1, len= m->data[1].len;
cp= buf;
while (*cp && *cp != ' ')
*bp++= *cp++;
*bp= 0;
if (*cp == ' ')
cp++;
len-= (cp-buf);
bool rc= Dispatch2(req, cp, len, retbuf, retlen, m);
if (retbuf)
IntSendTo(m->from, -m->serial, 0, retbuf, retlen);
if (rc)
delete m;
}
bool StreamConnection::Dispatch2(char *req, char *buf,
int len, char *&retbuf, int &retlen, Message *m)
{
if (!m->IsReply()) // not a reply
Dispatch(m->from, req, buf, len, retbuf, retlen);
else
fprintf(stderr, "StreamConnection::Dispatch2: oops: got a reply!!\n");
return TRUE;
}
//---- old interface -----------------------------------------------------------
int StreamConnection::SendTo(int to, char *req, char *buf, int l)
{
return IntSendTo(to, serial++, req, buf, l);
}
int StreamConnection::Talk(int to, char *req, char *buf, int len,
char **retbuf, int *retlen)
{
if (Reconnect())
return -1;
int seq= serial++;
if (!IntSendTo(to, seq, req, buf, len))
return FALSE;
Message *m= WaitForReply(seq, 20000);
if (m) {
if (retbuf) {
*retbuf= m->data[1].buf;
m->data[1].buf= 0;
}
if (retlen)
*retlen= m->data[1].len;
delete m;
}
return TRUE;
}
void StreamConnection::Talk(char *prog, char *va_(fmt), ...)
{
va_list ap;
char *rbuf= 0;
int to;
va_start(ap, va_(fmt));
Talk(1, "start", prog, -1, &rbuf);
if (rbuf && sscanf(rbuf, "%d", &to) == 1) {
char buf[2000];
strcpy(buf, prog);
strcat(buf, ":");
vsprintf(&buf[strlen(buf)], va_(fmt), ap);
fprintf(stderr, "[%s]\n", buf);
Talk(to, buf);
}
va_end(ap);
}
int StreamConnection::IntSendTo(int to, int reply, char *req, char *buf, int l)
{
if (Reconnect())
return FALSE;
int rl= 0;
if (req)
rl= strlen(req);
if (buf) {
if (l < 0)
l= strlen(buf);
} else
l= 0;
int tl= l + rl + 1;
if (l > 0 && rl > 0)
tl++;
char *bp= new char[tl];
if (rl > 0)
strcpy(bp, req);
if (l > 0 && rl > 0)
strcat(bp, " ");
if (l > 0)
strcat(bp, buf);
SendMessage(new Message(to, 0, reply, bp, tl, FALSE));
return TRUE;
}
void StreamConnection::Dispatch(int, char*, char*, int, char *&, int&)
{
}